package com.yh.blink.clickhouse;

import java.sql.*;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zhangwq
 * @date 2018/6/30 20:58
 */
public class ClickHouseJDBC implements Runnable{
    public static void main(String[] args) {
        String sqlCount = "SELECT a.shop_id as shopId -- 门店ID\n" +
                "    , a.shop_name as shopName -- 门店名称\n" +
                "    , a.partner_id as partnerId -- 配送员ID\n" +
                "    , partner_name as partnerName -- 配送员名称\n" +
                "    , sum(order_cnt) AS orderCnt -- 配送订单\n" +
                "    , sum(timeout_order_cnt) as timeoutOrderCnt -- 超时订单\n" +
                "    , sum(duration_avg) as durationAvg -- 平均配送时长\n" +
                "    , round(sum(timeout_order_cnt_rate),4) as timeoutOrderCntRate -- 订单超时率\n" +
                "    , sum(order_cnt_month) AS orderCntMonth -- 配送订单（月）\n" +
                "    , sum(timeout_order_cnt_month) as timeoutOrderCntMonth -- 超时订单（月）\n" +
                "    , sum(duration_avg_month) as durationAvgMonth -- 平均配送时长（月）\n" +
                "    , round(sum(timeout_order_cnt_rate_month),4) as timeoutOrderCntRateMonth -- 订单超时率（月）\n" +
                "FROM ( \n" +
                "\n" +
                "    SELECT\n" +
                "      toString(today()) as dim_date_id,\n" +
                "      a.shop_id,\n" +
                "      a.shop_name,  \n" +
                "      a.partner_id,\n" +
                "      max(a.partner_name) as partner_name,\n" +
                "      countDistinct(delivery_order_id) AS order_cnt,\n" +
                "      countDistinct(multiIf(is_later = 1, delivery_order_id, NULL )) as timeout_order_cnt,\n" +
                "      round(sum(delivery_duration)/countDistinct(delivery_order_id),2) AS duration_avg,\n" +
                "      round(countDistinct(multiIf(is_later = 1, delivery_order_id, NULL ))/countDistinct(delivery_order_id),2)  as timeout_order_cnt_rate,\n" +
                "      0 as order_cnt_month, -- 配送订单（月）\n" +
                "      0 as timeout_order_cnt_month ,-- 超时订单（月）\n" +
                "      0 as duration_avg_month\t,-- 平均配送时长（月）\n" +
                "      0 as timeout_order_cnt_rate_month  -- 订单超时率（月）\n" +
                "  FROM (\n" +
                "          select shop_id \n" +
                "              , shop_name \n" +
                "              , partner_id\n" +
                "              , partner_name\n" +
                "              , delivery_finish_at\n" +
                "              , delivery_order_id\n" +
                "              , delivery_start_at\n" +
                "              , (toDateTime(if(length(delivery_finish_at) > 10,delivery_finish_at,CAST(toDateTime(delivery_start_at) + 3600, 'String'))) - toDateTime(delivery_start_at)) / 60 AS delivery_duration\n" +
                "              , toDateTime(if(length(delivery_finish_at) < 10,time_slot_date_to_date,delivery_finish_at)) > toDateTime(time_slot_date_to_date) AS is_later\n" +
                "          from \n" +
                "          ( select order_id\n" +
                "            , delivery_order_id\n" +
                "            , shop_id\n" +
                "            , shop_name \n" +
                "            , partner_id\n" +
                "            , argMax(partner_name, now_time) as partner_name \n" +
                "            , argMax(delivery_finish_at, now_time) as delivery_finish_at\n" +
                "            , concat(argMax(time_slot_date_to, now_time), ':00') as time_slot_date_to_date\n" +
                "            , argMax(delivery_start_at, now_time) as delivery_start_at\n" +
                "            from cluster_rpt_yhdj.rpt_order_delivery_realtime_test \n" +
                "            where event_date = (today() - 1) \n" +
                "              and shop_id = '9L07'\n" +
                "              and partner_id = '36279' \n" +
                "            group by order_id, delivery_order_id, shop_id,shop_name, partner_id\n" +
                "          ) as m1\n" +
                "          where (length(delivery_start_at) > 10)\n" +
                "              AND (length(time_slot_date_to_date) > 10)\n" +
                "      ) AS a\n" +
                "    WHERE\n" +
                "      substring(a.delivery_finish_at, 1, 10) = substring(CAST(now(), 'String'), 1, 10)\n" +
                "    GROUP BY  a.shop_id,\n" +
                "              a.shop_name,\n" +
                "              a.partner_id\n" +
                "              \n" +
                "  UNION ALL \n" +
                "  \n" +
                "      select dim_date_id\n" +
                "         , shop_id\n" +
                "         , shop_name \n" +
                "         , partner_id\n" +
                "         , partner_name\n" +
                "         , 0 as order_cnt\n" +
                "         , 0 as timeout_order_cnt\n" +
                "         , 0 as duration_avg\n" +
                "         , 0 as timeout_order_cnt_rate\n" +
                "         , toInt64(order_cnt_month) as order_cnt_month \n" +
                "         , toInt64(timeout_order_cnt_month) as timeout_order_cnt_month \n" +
                "         , duration_avg_month\n" +
                "         , timeout_order_cnt_rate_month\n" +
                "      from (\n" +
                "            select dim_date_id\n" +
                "                  , shop_id\n" +
                "                  , shop_name\n" +
                "                  , partner_id\n" +
                "                  , partner_name\n" +
                "                  , order_cnt as order_cnt_month\n" +
                "                  , timeout_order_cnt as timeout_order_cnt_month\n" +
                "                  , round(duration_avg,2) as duration_avg_month\n" +
                "                  , timeout_order_cnt/toFloat64(order_cnt) AS timeout_order_cnt_rate_month\n" +
                "                  FROM outside_mysql.mysql107kpi_delivery_partner_report_1\n" +
                "                  WHERE dim_date_id = toString(today()-2)\n" +
                "                AND dim_id = 3\n" +
                "            ) \n" +
                "     where partner_id = '36279'\n" +
                "       and shop_id = '9L07'\n" +
                ") as a\n" +
                "GROUP BY a.shop_id-- 门店ID\n" +
                "       , a.shop_name\n" +
                "       , a.partner_id\n" +
                "       , partner_name";//查询ontime数据量
//        exeSql(sqlDB);
//        exeSql(sqlTab);


        ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 400, 400, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        for(int i=0;i<4;i++){
            ClickHouseJDBC myTask = new ClickHouseJDBC(sqlCount);
            executor.execute(myTask);
            System.out.println("线程池中线程数目："+executor.getPoolSize()+"，队列中等待执行的任务数目："+
                    executor.getQueue().size()+"，已执行玩别的任务数目："+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
    private String sql ;
    public ClickHouseJDBC(String sql){
       this.sql=sql;
    }
    public  void exeSql(String sql){
//        String address = "jdbc:clickhouse://10.9.60.2:60123/default?max_execution_time=70";
        String address = "jdbc:clickhouse://10.9.60.2:8123/default?socket_timeout=4000000";
        Connection connection = null;
        Statement statement = null;
        ResultSet results = null;
        try {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
//            connection = DriverManager.getConnection(address,"ycd_write","dUhKokKG");
            Properties pros = new Properties();
            pros.setProperty("user","ycd_write");
            pros.setProperty("password","dUhKokKG");
//            pros.setProperty("max_execution_time","50000");
//            pros.setProperty("socket_timeout","4000000");
            connection = DriverManager.getConnection(address,pros);
            statement = connection.createStatement();

            long begin = System.currentTimeMillis();
            results = statement.executeQuery(sql);
            long end = System.currentTimeMillis();
            System.out.println("执行（）耗时："+(end-begin)/1000 +"s");
            ResultSetMetaData rsmd = results.getMetaData();
            List<Map> list = new ArrayList();
            while(results.next()){
                Map map = new HashMap();
                for(int i = 1;i<=rsmd.getColumnCount();i++){
                    map.put(rsmd.getColumnName(i),results.getString(rsmd.getColumnName(i)));
                }
                list.add(map);
            }
            for(Map map : list){
                System.err.println(map);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {//关闭连接
            try {
                if(results!=null){
                    results.close();
                }
                if(statement!=null){
                    statement.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void run() {
        exeSql(this.sql);
    }
}
